Hikari连接池2 您所在的位置:网站首页 hikari 数据源 Hikari连接池2

Hikari连接池2

2023-12-17 18:50| 来源: 网络整理| 查看: 265

基于SpringBoot 2.2.7.RELEASE 依赖的 HikariCP 3.4.3。 源码包中源码和实际Class文件反编译代码有出入,以Class反编译代码为准。 Hikari连接池有两篇 Hikari连接池1–初始化连接池 Hikari连接池2–获取和归还连接

3、获取连接 //com.zaxxer.hikari.HikariDataSource#getConnection() public Connection getConnection() throws SQLException { if (isClosed()) { throw new SQLException("HikariDataSource " + this + " has been closed."); } //fastPathPool 非空,则从 fastPathPool 获取连接 if (fastPathPool != null) { return fastPathPool.getConnection(); } // See http://en.wikipedia.org/wiki/Double-checked_locking#Usage_in_Java HikariPool result = pool; if (result == null) { synchronized (this) { result = pool; if (result == null) { validate(); LOGGER.info("{} - Starting...", getPoolName()); try { //创建新的连接池,代码在前面 pool = result = new HikariPool(this); this.seal(); } catch (PoolInitializationException pie) { if (pie.getCause() instanceof SQLException) { throw (SQLException) pie.getCause(); } else { throw pie; } } LOGGER.info("{} - Start completed.", getPoolName()); } } } //从 pool 中获取连接 return result.getConnection(); }

从连接池中获取连接

//com.zaxxer.hikari.pool.HikariPool#getConnection() public Connection getConnection() throws SQLException { return getConnection(connectionTimeout); } private final ConcurrentBag connectionBag; //这个锁包装了一个 Semaphore private final SuspendResumeLock suspendResumeLock; //com.zaxxer.hikari.pool.HikariPool#getConnection(long) public Connection getConnection(final long hardTimeout) throws SQLException { //获取锁 suspendResumeLock.acquire(); //开始时间 final long startTime = currentTime(); try { //保存超时时间 long timeout = hardTimeout; //时间有剩余,尝试获取连接,这里会一直尝试获取连接,类似于自旋锁 do { //获取poolEntry PoolEntry poolEntry = connectionBag.borrow(timeout, MILLISECONDS); //超时未获取到,跳出,获取连接异常 if (poolEntry == null) { break; // We timed out... break and throw exception } final long now = currentTime(); if (poolEntry.isMarkedEvicted() || (elapsedMillis(poolEntry.lastAccessed, now) > aliveBypassWindowMs && !isConnectionAlive(poolEntry.connection))) { closeConnection(poolEntry, poolEntry.isMarkedEvicted() ? EVICTED_CONNECTION_MESSAGE : DEAD_CONNECTION_MESSAGE); timeout = hardTimeout - elapsedMillis(startTime); } else { metricsTracker.recordBorrowStats(poolEntry, startTime); return poolEntry.createProxyConnection(leakTaskFactory.schedule(poolEntry), now); } } while (timeout > 0L); metricsTracker.recordBorrowTimeoutStats(startTime); throw createTimeoutException(startTime); } catch (InterruptedException e) { Thread.currentThread().interrupt(); throw new SQLException(poolName + " - Interrupted during connection acquisition", e); } finally { suspendResumeLock.release(); } } 3.1、从ConcurrentBag中获取PoolEntry

当获取不到entry时,会阻塞timeout的时间

private final SynchronousQueue handoffQueue; private final CopyOnWriteArrayList sharedList; private final AtomicInteger waiters; private final ThreadLocal threadList; //com.zaxxer.hikari.util.ConcurrentBag#borrow public T borrow(long timeout, final TimeUnit timeUnit) throws InterruptedException { // Try the thread-local list first //先从threadlocal中尝试获取 final List list = threadList.get(); for (int i = list.size() - 1; i >= 0; i--) { final Object entry = list.remove(i); @SuppressWarnings("unchecked") final T bagEntry = weakThreadLocals ? ((WeakReference) entry).get() : (T) entry; //如果从ThreadLocal中获取到了并且将entry的状态设置为正在使用,则返回 if (bagEntry != null && bagEntry.compareAndSet(STATE_NOT_IN_USE, STATE_IN_USE)) { return bagEntry; } } // Otherwise, scan the shared list ... then poll the handoff queue //ThreadLocal中没有获取到,则从pool中尝试获取 //增加等到获取连接的数量 final int waiting = waiters.incrementAndGet(); try { //遍历bag list for (T bagEntry : sharedList) { //如果将空闲的entry设置为正在使用,则将当前bag返回 if (bagEntry.compareAndSet(STATE_NOT_IN_USE, STATE_IN_USE)) { // If we may have stolen another waiter's connection, request another bag add. if (waiting > 1) { listener.addBagItem(waiting - 1); } return bagEntry; } } listener.addBagItem(waiting); //没有获取到bag timeout = timeUnit.toNanos(timeout); do { final long start = currentTime(); //尝试从 handoffQueue 中获取,核心线程池之外的线程 final T bagEntry = handoffQueue.poll(timeout, NANOSECONDS); //不管是否获取到,都返回,获取到了,返回线程,没获取到,看是否获取连接超时异常 if (bagEntry == null || bagEntry.compareAndSet(STATE_NOT_IN_USE, STATE_IN_USE)) { return bagEntry; } timeout -= elapsedNanos(start); } while (timeout > 10_000); return null; } finally { //减少等待获取连接的计数 waiters.decrementAndGet(); } } 3.2、创建Connection //com.zaxxer.hikari.pool.PoolEntry#createProxyConnection Connection createProxyConnection(final ProxyLeakTask leakTask, final long now) { return ProxyFactory.getProxyConnection(this, connection, openStatements, leakTask, now, isReadOnly, isAutoCommit); }

创建HikariProxyConnection

//com.zaxxer.hikari.pool.ProxyFactory#getProxyConnection static ProxyConnection getProxyConnection(PoolEntry entry, Connection connection, FastList openStatements, ProxyLeakTask leakTask, long now, boolean isReadOnly, boolean isAutoCommit) { return new HikariProxyConnection(entry, connection, openStatements, leakTask, now, isReadOnly, isAutoCommit); } protected HikariProxyConnection(PoolEntry entry, Connection connection, FastList openStatements, ProxyLeakTask leakTask, long now, boolean isReadOnly, boolean isAutoCommit) { super(entry, connection, openStatements, leakTask, now, isReadOnly, isAutoCommit); }

HikariProxyConnection 继承了 ProxyConnection

protected ProxyConnection(final PoolEntry poolEntry, final Connection connection, final FastList openStatements, final ProxyLeakTask leakTask, final long now, final boolean isReadOnly, final boolean isAutoCommit) { this.poolEntry = poolEntry; this.delegate = connection; this.openStatements = openStatements; this.leakTask = leakTask; this.lastAccess = now; this.isReadOnly = isReadOnly; this.isAutoCommit = isAutoCommit; } 4、关闭连接

HikariProxyConnection 没有实现 close 方法,因此关闭连接在他的父类

ProxyConnection 实现了 Connection,重写了 close 方法

private int dirtyBits; private boolean isAutoCommit; private boolean isCommitStateDirty; protected Connection delegate; //com.zaxxer.hikari.pool.ProxyConnection#close public final void close() throws SQLException { // Closing statements can cause connection eviction, so this must run before the conditional below //关闭连接绑定的所有statement closeStatements(); //连接不是关闭状态 if (delegate != ClosedConnection.CLOSED_CONNECTION) { //关闭任务 leakTask.cancel(); try { if (isCommitStateDirty && !isAutoCommit) { //回滚 delegate.rollback(); //更新最后访问时间 lastAccess = currentTime(); LOGGER.debug("{} - Executed rollback on connection {} due to dirty commit state on close().", poolEntry.getPoolName(), delegate); } if (dirtyBits != 0) { //重置连接状态 poolEntry.resetConnectionState(this, dirtyBits); lastAccess = currentTime(); } delegate.clearWarnings(); } catch (SQLException e) { // when connections are aborted, exceptions are often thrown that should not reach the application if (!poolEntry.isMarkedEvicted()) { throw checkException(e); } } finally { delegate = ClosedConnection.CLOSED_CONNECTION; poolEntry.recycle(lastAccess); } } } 4.1、关闭连接绑定的所有statement //实现了List接口 private final FastList openStatements; //com.zaxxer.hikari.pool.ProxyConnection#closeStatements private synchronized void closeStatements() { final int size = openStatements.size(); if (size > 0) { for (int i = 0; i // automatic resource cleanup } catch (SQLException e) { LOGGER.warn("{} - Connection {} marked as broken because of an exception closing open statements during Connection.close()", poolEntry.getPoolName(), delegate); leakTask.cancel(); poolEntry.evict("(exception closing Statements during Connection.close())"); delegate = ClosedConnection.CLOSED_CONNECTION; } } openStatements.clear(); } } 4.2、获取关闭的连接 private static final class ClosedConnection { static final Connection CLOSED_CONNECTION = getClosedConnection(); private static Connection getClosedConnection() { InvocationHandler handler = (proxy, method, args) -> { final String methodName = method.getName(); if ("isClosed".equals(methodName)) { return Boolean.TRUE; } else if ("isValid".equals(methodName)) { return Boolean.FALSE; } if ("abort".equals(methodName)) { return Void.TYPE; } if ("close".equals(methodName)) { return Void.TYPE; } else if ("toString".equals(methodName)) { return ClosedConnection.class.getCanonicalName(); } throw new SQLException("Connection is closed"); }; return (Connection) Proxy.newProxyInstance(Connection.class.getClassLoader(), new Class[] { Connection.class }, handler); } } 4.3、归还线程 //com.zaxxer.hikari.pool.PoolEntry#recycle void recycle(final long lastAccessed) { if (connection != null) { this.lastAccessed = lastAccessed; hikariPool.recycle(this); } } //com.zaxxer.hikari.pool.HikariPool#recycle void recycle(final PoolEntry poolEntry) { metricsTracker.recordConnectionUsage(poolEntry); connectionBag.requite(poolEntry); } //com.zaxxer.hikari.util.ConcurrentBag#requite public void requite(final T bagEntry) { //将entry的状态重新置为未使用,这里不需要加锁,因为只有获取到连接的线程才可以释放连接 bagEntry.setState(STATE_NOT_IN_USE); //遍历等待队列,唤醒等待者,尝试获取连接 for (int i = 0; waiters.get() > 0; i++) { //bagEntry.getState() != STATE_NOT_IN_USE 这里是因为又被别的线程获取到连接了 //handoffQueue.offer 又放回到扩展线程队列里面了,代表当前连接池核心连接够用 if (bagEntry.getState() != STATE_NOT_IN_USE || handoffQueue.offer(bagEntry)) { return; }//唤醒等待线程 else if ((i & 0xff) == 0xff) { parkNanos(MICROSECONDS.toNanos(10)); } else { Thread.yield(); } } final List threadLocalList = threadList.get(); if (threadLocalList.size()


【本文地址】

公司简介

联系我们

今日新闻

    推荐新闻

    专题文章
      CopyRight 2018-2019 实验室设备网 版权所有